
module Stalking
  class Consumer
    def initialize(options = {}, &block)
      @servers = options[:servers] || ["localhost:11300"]
      @logger = options[:logger] || Stalking::NullLogger.new
      @handlers = Handlers.new

      if block_given?
        @handlers.instance_eval &block

        work
      end
    end

    def handle(handlers, name, args)
      handlers.each do |handler|
        handler.call name, args
      end
    end

    def handle_error(e, name, args)
      @handlers.error_handlers.each do |handler|
        handler.call e, name, args
      end
    rescue => e
      @logger.fatal e
    end

    private

    def work
      looping do
        job = connection.reserve

        lock do
          perform job

          job.delete
        end
      end
    rescue EOFError, Beanstalk::NotConnected => e
      @logger.fatal e

      sleep 1

      reconnect

      retry
    rescue => e
      @logger.fatal e

      raise e
    end

    def perform(job)
      name, args = JSON.parse(job.body)

      if handler = @handlers.job_handlers[name]
        begin
          Timeout::timeout(job.ttr - 1) do
            handle @handlers.before_handlers, name, args

            handler.call args

            handle @handlers.after_handlers, name, args
          end
        rescue Beanstalk::NotConnected => e
          raise e # Re-raise
        rescue Timeout::Error, StandardError => e
          handle_error e, name, args
        end
      end
    end

    def looping
      continue = true

      trap "QUIT" do
        exit unless @locked

        continue = false
      end

      trap "USR2" do
        exit unless @locked

        continue = false
      end

      while continue
        yield
      end
    end

    def lock
      @locked = true

      yield
    ensure
      @locked = false
    end

    def connect
      @connection ||= Beanstalk::Pool.new(@servers)

      @handlers.job_handlers.keys.each { |name| @connection.watch name }

      @connection
    rescue Beanstalk::NotConnected => e
      @logger.fatal e

      sleep 1

      retry
    end

    alias :connection :connect

    def reconnect
      @connection = nil

      connect
    end
  end
end

